package com.boilermaker.mq.mqclient;

import com.boilermaker.mq.common.BinaryTool;
import com.boilermaker.mq.common.Consumer;
import com.boilermaker.mq.common.MqException;
import com.boilermaker.mq.common.Request;
import com.boilermaker.mq.common.request_payload.*;
import com.boilermaker.mq.common.response_payload.BasicReturns;
import com.boilermaker.mq.mqserver.core.BasicProperties;
import com.boilermaker.mq.mqserver.core.Binding;
import com.boilermaker.mq.mqserver.core.ExchangeType;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

@Getter
@Slf4j
public class Channel {
    // 使用 UUID 生成
    private final String channelId;
    // 当前 Channel 属于哪个连接
    private final Connection connection;
    // 记录服务器返回的对客户端控制请求（0x1-0xa）的响应
    private final ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    // 用户为当前 Channel 定义的回调函数
    private Consumer consumer;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    // 和服务器交互，通知服务器创建新的 Channel（该方法由上层 Connection 封装）
    protected boolean createChannel() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        // 通过调用 Connection 中的方法，向服务器发送请求。
        // Channel 不单独持有 Socket，通过上层 Connection 访问流对象，实现网络交互。
        writeRequest(request);
        // 等待服务器响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器关闭 Channel
    public boolean closeChannel() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器创建 Exchange
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable,
                                   boolean autoDelete, Map<String, Object> arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);

        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器删除 Exchange
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
        exchangeDeleteArguments.setRid(generateRid());
        exchangeDeleteArguments.setChannelId(channelId);
        exchangeDeleteArguments.setExchangeName(exchangeName);

        byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器创建 Queue
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive,
                                boolean autoDelete, Map<String, Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);

        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器删除 Queue
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);

        byte[] payload = BinaryTool.toBytes(queueDeleteArguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器创建 Binding
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setBindingKey(bindingKey);

        byte[] payload = BinaryTool.toBytes(queueBindArguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(queueBindArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器解除 Binding
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();
        queueUnbindArguments.setRid(generateRid());
        queueUnbindArguments.setChannelId(channelId);
        queueUnbindArguments.setQueueName(queueName);
        queueUnbindArguments.setExchangeName(exchangeName);

        byte[] payload = BinaryTool.toBytes(queueUnbindArguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器发布 Message
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
        basicPublishArguments.setRid(generateRid());
        basicPublishArguments.setChannelId(channelId);
        basicPublishArguments.setExchangeName(exchangeName);
        basicPublishArguments.setRoutingKey(routingKey);
        basicPublishArguments.setBasicProperties(basicProperties);
        basicPublishArguments.setBody(body);

        byte[] payload = BinaryTool.toBytes(basicPublishArguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，通知服务器订阅 Message
    // 成功订阅后，服务器将自动推送消息（通过调用回调函数）
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 在客户端保存回调函数。回调函数无需网络传输，仅在客户端执行。
        if (this.consumer != null) throw new MqException("不可重复订阅");
        this.consumer = consumer;

        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setConsumeTag(channelId);
        basicConsumeArguments.setQueueName(queueName);
        basicConsumeArguments.setAutoAck(autoAck);

        byte[] payload = BinaryTool.toBytes(basicConsumeArguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.isOk();
    }

    // 和服务器交互，设置应答模式
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments basicAckArguments = new BasicAckArguments();
        basicAckArguments.setRid(generateRid());
        basicAckArguments.setChannelId(channelId);
        basicAckArguments.setQueueName(queueName);
        basicAckArguments.setMessageId(messageId);

        byte[] payload = BinaryTool.toBytes(basicAckArguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        writeRequest(request);
        BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
        return basicReturns.isOk();
    }

    // 发送请求
    private void writeRequest(Request request) throws IOException {
        connection.getDataOutputStream().writeInt(request.getType());
        connection.getDataOutputStream().writeInt(request.getLength());
        connection.getDataOutputStream().write(request.getPayload());
        connection.getDataOutputStream().flush();
        log.info("请求已发送, type=" + request.getType() + ", length=" + request.getLength());
    }

    // 阻塞等待响应
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns;
        while (basicReturnsMap.get(rid) == null) {
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        basicReturns = basicReturnsMap.get(rid);
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    // 收到响应时唤醒 waitResult
    protected void notifyChannel(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            notifyAll();
        }
    }

    // 生成 Rid
    private String generateRid() {
        return "R-" + UUID.randomUUID();
    }
}
